home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- '''Handy standard storage machinery
-
- $Id: BaseStorage.py 38774 2005-10-05 19:46:16Z tim_one $
- '''
- import cPickle
- import threading
- import time
- import logging
- from struct import pack as _structpack, unpack as _structunpack
- from persistent.TimeStamp import TimeStamp
- from ZODB import POSException
- from ZODB.utils import z64, oid_repr
- from ZODB.UndoLogCompatible import UndoLogCompatible
- log = logging.getLogger('ZODB.BaseStorage')
-
- class BaseStorage(UndoLogCompatible):
- '''Abstract base class that supports storage implementations.
-
- A subclass must define the following methods:
- load()
- close()
- cleanup()
- lastSerial()
- lastTransaction()
-
- It must override these hooks:
- _begin()
- _vote()
- _abort()
- _finish()
- _clear_temp()
-
- If it stores multiple revisions, it should implement
- loadSerial()
- loadBefore()
- iterator()
-
- If the subclass wants to implement undo, it should implement the
- multiple revision methods and:
- loadSerial()
- undo()
- undoInfo()
- undoLog()
-
- If the subclass wants to implement versions, it must implement:
- abortVersion()
- commitVersion()
- modifiedInVersion()
- versionEmpty()
- versions()
-
- Each storage will have two locks that are accessed via lock
- acquire and release methods bound to the instance. (Yuck.)
- _lock_acquire / _lock_release (reentrant)
- _commit_lock_acquire / _commit_lock_release
-
- The commit lock is acquired in tpc_begin() and released in
- tpc_abort() and tpc_finish(). It is never acquired with the other
- lock held.
-
- The other lock appears to protect _oid and _transaction and
- perhaps other things. It is always held when load() is called, so
- presumably the load() implementation should also acquire the lock.
- '''
- _transaction = None
- _tstatus = ' '
- _is_read_only = False
-
- def __init__(self, name, base = None):
- self.__name__ = name
- log.debug('create storage %s', self.__name__)
- l = threading.RLock()
- self._lock_acquire = l.acquire
- self._lock_release = l.release
- l = threading.Lock()
- self._commit_lock_acquire = l.acquire
- self._commit_lock_release = l.release
- t = time.time()
- t = self._ts = apply(TimeStamp, time.gmtime(t)[:5] + (t % 60,))
- self._tid = `t`
- if base is None:
- self._oid = z64
- else:
- self._oid = base._oid
-
-
- def abortVersion(self, src, transaction):
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
-
- return (self._tid, [])
-
-
- def commitVersion(self, src, dest, transaction):
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
-
- return (self._tid, [])
-
-
- def close(self):
- pass
-
-
- def cleanup(self):
- pass
-
-
- def sortKey(self):
- '''Return a string that can be used to sort storage instances.
-
- The key must uniquely identify a storage and must be the same
- across multiple instantiations of the same storage.
- '''
- return self.__name__
-
-
- def getName(self):
- return self.__name__
-
-
- def getSize(self):
- return len(self) * 300
-
-
- def history(self, oid, version, length = 1, filter = None):
- pass
-
-
- def modifiedInVersion(self, oid):
- return ''
-
-
- def new_oid(self):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
- self._lock_acquire()
-
- try:
- last = self._oid
- d = ord(last[-1])
- if d < 255:
- last = last[:-1] + chr(d + 1)
- else:
- (last_as_long,) = _structunpack('>Q', last)
- last = _structpack('>Q', last_as_long + 1)
- self._oid = last
- return last
- finally:
- self._lock_release()
-
-
-
- def set_max_oid(self, possible_new_max_oid):
- self._lock_acquire()
-
- try:
- if possible_new_max_oid > self._oid:
- self._oid = possible_new_max_oid
- finally:
- self._lock_release()
-
-
-
- def registerDB(self, db, limit):
- pass
-
-
- def isReadOnly(self):
- return self._is_read_only
-
-
- def supportsUndo(self):
- return 0
-
-
- def supportsVersions(self):
- return 0
-
-
- def tpc_abort(self, transaction):
- self._lock_acquire()
-
- try:
- if transaction is not self._transaction:
- return None
-
-
- try:
- self._abort()
- self._clear_temp()
- self._transaction = None
- finally:
- self._commit_lock_release()
-
- finally:
- self._lock_release()
-
-
-
- def _abort(self):
- '''Subclasses should redefine this to supply abort actions'''
- pass
-
-
- def tpc_begin(self, transaction, tid = None, status = ' '):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
- self._lock_acquire()
-
- try:
- if self._transaction is transaction:
- return None
-
- self._lock_release()
- self._commit_lock_acquire()
- self._lock_acquire()
- self._transaction = transaction
- self._clear_temp()
- user = transaction.user
- desc = transaction.description
- ext = transaction._extension
- if ext:
- ext = cPickle.dumps(ext, 1)
- else:
- ext = ''
- self._ude = (user, desc, ext)
- if tid is None:
- now = time.time()
- t = TimeStamp(*time.gmtime(now)[:5] + (now % 60,))
- self._ts = t = t.laterThan(self._ts)
- self._tid = `t`
- else:
- self._ts = TimeStamp(tid)
- self._tid = tid
- self._tstatus = status
- self._begin(self._tid, user, desc, ext)
- finally:
- self._lock_release()
-
-
-
- def _begin(self, tid, u, d, e):
- '''Subclasses should redefine this to supply transaction start actions.
- '''
- pass
-
-
- def tpc_vote(self, transaction):
- self._lock_acquire()
-
- try:
- if transaction is not self._transaction:
- return None
-
- self._vote()
- finally:
- self._lock_release()
-
-
-
- def _vote(self):
- '''Subclasses should redefine this to supply transaction vote actions.
- '''
- pass
-
-
- def tpc_finish(self, transaction, f = None):
- self._lock_acquire()
-
- try:
- if transaction is not self._transaction:
- return None
-
-
- try:
- if f is not None:
- f(self._tid)
-
- (u, d, e) = self._ude
- self._finish(self._tid, u, d, e)
- self._clear_temp()
- return self._tid
- finally:
- self._ude = None
- self._transaction = None
- self._commit_lock_release()
-
- finally:
- self._lock_release()
-
-
-
- def _finish(self, tid, u, d, e):
- '''Subclasses should redefine this to supply transaction finish actions
- '''
- pass
-
-
- def undo(self, transaction_id, txn):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
- raise POSException.UndoError('non-undoable transaction')
-
-
- def undoLog(self, first, last, filter = None):
- return ()
-
-
- def versionEmpty(self, version):
- return 1
-
-
- def versions(self, max = None):
- return ()
-
-
- def pack(self, t, referencesf):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
-
-
- def getSerial(self, oid):
- self._lock_acquire()
-
- try:
- v = self.modifiedInVersion(oid)
- (pickledata, serial) = self.load(oid, v)
- return serial
- finally:
- self._lock_release()
-
-
-
- def loadSerial(self, oid, serial):
- raise POSException.Unsupported('Retrieval of historical revisions is not supported')
-
-
- def loadBefore(self, oid, tid):
- '''Return most recent revision of oid before tid committed.'''
- n = 2
- start_time = None
- end_time = None
- while start_time is None:
- L = self.history(oid, '', n, (lambda d: not d['version']))
- if not L:
- return None
-
- for d in L:
- if d['serial'] < tid:
- start_time = d['serial']
- break
- continue
- end_time = d['serial']
-
- if len(L) < n:
- break
-
- n *= 2
- if start_time is None:
- return None
-
- data = self.loadSerial(oid, start_time)
- return (data, start_time, end_time)
-
-
- def getExtensionMethods(self):
- '''getExtensionMethods
-
- This returns a dictionary whose keys are names of extra methods
- provided by this storage. Storage proxies (such as ZEO) should
- call this method to determine the extra methods that they need
- to proxy in addition to the standard storage methods.
- Dictionary values should be None; this will be a handy place
- for extra marshalling information, should we need it
- '''
- return { }
-
-
- def copyTransactionsFrom(self, other, verbose = 0):
- '''Copy transactions from another storage.
-
- This is typically used for converting data from one storage to
- another. `other` must have an .iterator() method.
- '''
- _ts = None
- ok = 1
- preindex = { }
- preget = preindex.get
- restoring = hasattr(self, 'restore')
- fiter = other.iterator()
- for transaction in fiter:
- tid = transaction.tid
- if _ts is None:
- _ts = TimeStamp(tid)
- else:
- t = TimeStamp(tid)
- if t <= _ts:
- if ok:
- print 'Time stamps out of order %s, %s' % (_ts, t)
-
- ok = 0
- _ts = t.laterThan(_ts)
- tid = `_ts`
- else:
- _ts = t
- if not ok:
- print 'Time stamps back in order %s' % t
- ok = 1
-
- if verbose:
- print _ts
-
- self.tpc_begin(transaction, tid, transaction.status)
- for r in transaction:
- oid = r.oid
- if verbose:
- print oid_repr(oid), r.version, len(r.data)
-
- if restoring:
- self.restore(oid, r.tid, r.data, r.version, r.data_txn, transaction)
- continue
- pre = preget(oid, None)
- s = self.store(oid, pre, r.data, r.version, transaction)
- preindex[oid] = s
-
- self.tpc_vote(transaction)
- self.tpc_finish(transaction)
-
- fiter.close()
-
-
-
- class TransactionRecord:
- '''Abstract base class for iterator protocol'''
- pass
-
-
- class DataRecord:
- '''Abstract base class for iterator protocol'''
- pass
-
-